package org.adam2.chamuel.sink.collector;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import org.adam2.chamuel.entity.StockRecord;
import org.apache.flume.*;
import org.apache.flume.conf.Configurable;
import org.apache.flume.sink.AbstractSink;
import redis.clients.jedis.HostAndPort;
import redis.clients.jedis.JedisCluster;
import redis.clients.jedis.JedisPoolConfig;

import java.util.*;

public class StockRecordCollectorSink extends AbstractSink implements Configurable {
    private static JedisPoolConfig poolConfig;
    private static Set<HostAndPort> nodes;
    private static JedisCluster cluster;
    private static Map<String, String> map = new HashMap<String, String>();;
    private static JedisClusterPipeline jedisClusterPipeline;

    private static JSONArray resultJsonArray=new JSONArray();

    public static JedisClusterPipeline getJedisClusterPipeline() {
        if(null == poolConfig){
            poolConfig = new JedisPoolConfig();
            // 最大连接数
            poolConfig.setMaxTotal(1);
            // 最大空闲数
            poolConfig.setMaxIdle(1);
            // 最大允许等待时间，如果超过这个时间还未获取到连接，则会报JedisException异常：
            // Could not get a resource from the pool
            poolConfig.setMaxWaitMillis(120000);
        }

        if(null == nodes){
            nodes = new LinkedHashSet<HostAndPort>();
            nodes.add(new HostAndPort("192.168.23.131", 7000));
            nodes.add(new HostAndPort("192.168.23.131", 7001));
            nodes.add(new HostAndPort("192.168.23.131", 7002));
            nodes.add(new HostAndPort("192.168.23.130", 7003));
            nodes.add(new HostAndPort("192.168.23.130", 7004));
            nodes.add(new HostAndPort("192.168.23.130", 7005));
        }

        if(null == cluster){
            cluster = new JedisCluster(nodes, poolConfig);
        }

        if(null == jedisClusterPipeline) {
            jedisClusterPipeline = JedisClusterPipeline.pipelined(cluster);
            jedisClusterPipeline.refreshCluster();
        }

        return jedisClusterPipeline;
    }

    public Status process() throws EventDeliveryException {
        Transaction trans = null;
        try {
            Channel channel = getChannel();
            trans = channel.getTransaction();
            trans.begin();
            Event event = channel.take();
            if (event == null) {
                trans.rollback();
                return Status.BACKOFF;
            } else {
                String jsonString = new String(event.getBody());
                if(null != jsonString && jsonString.length() > 0){
                	JSONArray jsonArray=JSON.parseArray(jsonString);
                    System.out.println("resultJsonArray.size():  "+resultJsonArray.size());
                    if(null!=resultJsonArray && resultJsonArray.size()<1000){
                        resultJsonArray.addAll(jsonArray);
                    }else {
                        for(int i=0; i</*jsonArray*/resultJsonArray.size(); i++){
                            String jsonStr=/*jsonArray*/resultJsonArray.get(i).toString();
                            StockRecord stockRecord=StockRecord.toEntity(JSON.parseObject(jsonStr));
                            stockRecord.setId(UUID.randomUUID().toString().replace("-", ""));
                            String stockRecordString=JSON.toJSONString(stockRecord);
                            //System.out.println("sink:  "+stockRecordString);
                            try{
                            	getJedisClusterPipeline().set(stockRecord.getId(), stockRecordString);
                            }catch(Exception e){
                            	
                            }
                        }
                        resultJsonArray.clear();
                        getJedisClusterPipeline().sync();
                    }                    
                }
                //System.out.println("sink:  commit");
                trans.commit();
            }
        } catch (Exception e) {
            trans.rollback();
            e.printStackTrace();
        } finally {
            trans.close();
        }
        return Status.BACKOFF;
    }

    public void configure(Context context) {

    }
}
